/* Copyright (c) 2015,2016 Lucky Byte, Inc.
 */
package com.lucky_byte.pay.sched;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.lucky_byte.pay.jar.Jdbc;
import com.lucky_byte.pay.jar.JdbcRecord;
import com.lucky_byte.pay.jar.JdbcTable;
import com.lucky_byte.pay.jar.Runtime;

public class Main
{
	private static final Logger logger = LogManager.getLogger();
	private static Sched sched = Sched.getInstance();

	public static void main(String[] args) {
		logger.info("启动定时任务调度服务...");

		Runtime.setNotifySender("任务调度服务");

		java.lang.Runtime.getRuntime().addShutdownHook(
				new Thread(new Runnable() {
					public void run() {
						logger.info("定时任务调度服务开始退出...");
						sched.shutdown();
					}
				}));
		schedAll();
		listenDBNotifications();
	}

	/**
	 * 启动任务调度
	 */
	private static void schedAll() {
		JdbcTable table = JdbcTable.listBy_NE("pay_sched", "status = 1", null);
		for (int i = 0; i < table.getRecordCount(); i++) {
			JdbcRecord record = table.getRecord(i);
			logger.info("调度任务[{}].", record.getString("name"));
			if (!sched.sched(record)) {
				logger.error("调度定时任务[{}]失败.", record.getString("name"));
			}
		}
	}

	/**
	 * 监听数据库变动
	 */
	private static void listenDBNotifications() {
		try {
			Connection conn = Jdbc.getConnection();
			PGConnection pgconn = (PGConnection) conn;
			Statement stmt = conn.createStatement();
			stmt.execute("listen pay_sched");
			stmt.close();
			while (true) {
				PGNotification notifications[] = pgconn.getNotifications();
				if (notifications != null) {
					for (int i = 0; i < notifications.length; i++) {
						logger.info("监听到定时任务配置变更，开始处理...");
						respNotification(notifications[i]);
					}
				}
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					logger.info("DB notification listener been interrupted.");
					break;
				}
			}
		} catch (SQLException e) {
			logger.error("SQL error: {}[{}]", e.getMessage(),
					e.getClass().getSimpleName());
			return;
		}
	}

	/**
	 * 处理数据库通知
	 */
	private static void respNotification(PGNotification notification) {
		if (!notification.getName().equals("pay_sched")) {
			logger.fatal("数据库通知 channel 名应该为 pay_sched.");
			return;
		}
		String parameter = notification.getParameter();
		if (parameter == null) {
			logger.warn("数据库通知参数为空，请检查...");
			return;
		}
		logger.trace("数据库通知参数[{}].", parameter);

		Map<String, String> params;
		try {
			params = new Gson().fromJson(parameter,
				new TypeToken<HashMap<String, String>>() {}.getType());
			if (params == null) {
				logger.warn("数据库通知参数[{}]无效，请检查...", parameter);
				return;
			}
		} catch (Exception e) {
			logger.warn("数据库通知参数[{}]不是有效的 JSON 格式.", parameter);
			return;
		}
		switch (params.get("action")) {
		case "add":
			respNotificationAdd(params);
			break;
		case "modify":
			respNotificationModify(params);
			break;
		case "delete":
			logger.info("任务[{}]被用户删除，取消调度...", params.get("serial"));
			sched.unsched(params.get("serial"));
			break;
		case "trigger":
			respNotificationTrigger(params);
			break;
		}
	}

	/**
	 * 新增调度任务
	 */
	private static void respNotificationAdd(Map<String, String> params) {
		JdbcTable table = JdbcTable.listBy1_NE(
				"pay_sched", "serial = ?", params.get("serial"));
		if (table == null || table.getRecordCount() == 0) {
			logger.error("未找到序号为[{}]的定时任务配置.", params.get("serial"));
			return;
		}
		JdbcRecord record = table.getRecord(0);
		if (record.getInteger("status") == 0) {
			logger.debug("新增任务为未调度状态，暂时不调度.");
			return;
		}
		logger.info("用户新增调度任务[{}].", record.getString("name"));
		sched.sched(record);
	}

	/**
	 * 修改调度任务
	 */
	private static void respNotificationModify(Map<String, String> params) {
		JdbcTable table = JdbcTable.listBy1_NE(
				"pay_sched", "serial = ?", params.get("serial"));
		if (table == null || table.getRecordCount() == 0) {
			logger.error("未找到序号为[{}]的定时任务配置.", params.get("serial"));
			return;
		}
		JdbcRecord record = table.getRecord(0);
		if (record.getInteger("status") == 0) {
			logger.info("任务[{}]被用户改为未调度状态，取消之前可能存在的调度...",
					record.getString("name"));
			sched.unsched(params.get("serial"));
		} else {
			logger.info("任务[{}]被用户改为调度状态，取消之前可能存在的调度后，重新使用新参数调度...",
					record.getString("name"));
			sched.unsched(params.get("serial"));
			sched.sched(record);
		}
	}

	/**
	 * 立即触发调度任务
	 */
	private static void respNotificationTrigger(Map<String, String> params) {
		JdbcTable table = JdbcTable.listBy1_NE(
				"pay_sched", "serial = ?", params.get("serial"));
		if (table == null || table.getRecordCount() == 0) {
			logger.error("未找到序号为[{}]的定时任务配置.", params.get("serial"));
			return;
		}
		JdbcRecord record = table.getRecord(0);
		logger.info("用户请求立即触发任务[{}].", record.getString("name"));
		sched.trigger(record);
	}

}
